用redisson的分布式锁实现主从选举(leader election)

问题

用户数上升,服务要集群,如何实现主从机制,并且当主服务挂掉或停机维护时,其它任意从服务可自动变成主服务?

程序猿A:这还不简单,用zookeeper就行了,配上Apache curator更方便,直接帮你实现好Leader Election了。

程序猿B:嗯…zookeeper又要安装个服务,不想只为了这个主从又引入一个新东西,我们已经有redis了,能不能基于redis来实现?

答案是:yes。

思路

开始前,先说一下基本的实现思路:

  1. 先有一个redisson的分布式锁RLock,名称为:leader-lock
  2. 所有服务在启动的时候都去尝试获取leader锁
  3. 获取锁成功的服务为主服务
  4. 未获取锁的其它服务为从服务
  5. 从服务每隔几秒钟一直去尝试获取leader锁,当主服务挂掉或停机时,其中一个从服务就会获取到锁变成主服务

码起来

分布式锁的初始化
1
RLock leaderLock = redissonClient.getLock(“leader-lock”);
ElectionThread

有了锁后,我们需要一个专门的线程用于获取锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class ElectionThread extends Thread {

private boolean isMaster = false;

public ElectionThread() {
setName("leader-election");
}

@Override
public void run() {
while (!stop) {
try {
if (isMaster) {
synchronized (masterLock) {
//leader锁获取到了,就不需要再去获取了,进入阻塞状态
masterLock.wait();
}
} else {
//所有从服务尝试获取leader锁,尝试并等待一定时间,如果未获取成功,就一直重试
isMaster = leaderLock.tryLock(WAIT_SECONDS, TimeUnit.SECONDS);
if (isMaster) {
//leader锁获取成功,当前服务为主服务
logger.info("got leadership");
}
}
} catch (InterruptedException e) {

}
}
}

//判断leader锁是否获取成功
public boolean isMaster() {
return isMaster;
}
}
tryHold

有了ElectionThread,需要提供一个方法启动它去获取锁

1
2
3
4
5
6
public void tryHold(String leaderName) {
//分布式锁的初始化
leaderLock = redissonClient.getLock(leaderName);
//启动获取锁的线程
electionThread.start();
}
锁释放

锁获取到了,如果要释放怎么释放?ElectionThread需要加上释放锁的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class ElectionThread extends Thread {

private boolean isMaster = false;

public ElectionThread() {
setName("leader-election");
}

@Override
public void run() {
while (!stop) {
try {
if (isMaster) {
synchronized (masterLock) {
//leader锁获取到了,就不需要再去获取了,进入阻塞状态
masterLock.wait();
}
} else {
//所有从服务尝试获取leader锁,尝试并等待一定时间,如果未获取成功,就一直重试
isMaster = leaderLock.tryLock(WAIT_SECONDS, TimeUnit.SECONDS);
if (isMaster) {
//leader锁获取成功,当前服务为主服务
logger.info("got leadership");
}
}
} catch (InterruptedException e) {

}
}
//如果leader锁被当前线程占用,就释放锁
if (leaderLock.isLocked() && leaderLock.isHeldByCurrentThread()) {
leaderLock.unlock();
}

if (isMaster) {
isMaster = false;
}
}

//判断leader锁是否获取成功
public boolean isMaster() {
return isMaster;
}
}
shutdown

等等,ElectionThread是加上了释放锁的逻辑了,但当ElectionThread得到锁的时候,线程已经阻塞了,我们需要在外部唤醒ElectionThread线程并跳出while循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void shutdown() {
//stop设为true,ElectionThread中的while循环即可退出
stop = true;
try {
synchronized (masterLock) {
//唤醒ElectionThread
masterLock.notifyAll();
}
//等待ElectionThread死亡
electionThread.join();
} catch (InterruptedException e) {

}
logger.info("shutdown and give up leadership");
}
shutdown hook

有了shutdown方法后,我们再加个shutdownHook,就可以在jvm停止时调用shutdown方法,leader锁就会被释放

1
2
3
4
5
6
7
8
public void tryHold(String leaderName) {
//分布式锁的初始化
leaderLock = redissonClient.getLock(leaderName);
//启动获取锁的线程
electionThread.start();
//shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> shutdown()));
}

jvm的shutdownHook是在服务正常退出的情况下才会生效,如果服务异常退出,会怎样?leader锁会释放吗?放心,redisson有个lockWatchdogTimeout配置,这个配置会让redisson客户端处于正常状态的时候,给那些不会自动释放的锁延长过期时间,如果服务异常了,那些不会自动释放的锁由于没有延长过期时间,会被redis自动清除,所以leader锁即使在服务异常退出的情况下,也会自动释放。

lockWatchdogTimeout

lockWatchdogTimeout的配置可以用代码方式设置,也可以用配置文件方式设置,一定要注意它的单位是毫秒(我测试的时候设成了10,结果找了半天问题),默认值是30000毫秒,即30秒。

1
2
3
4
5
import org.redisson.config.Config
//从yaml配置文件中读取配置
Config config = Config.fromYAML(configFile.getInputStream());
//代码方式设置,如果希望主从切换更快,时间可以设置成5秒
config.setLockWatchdogTimeout(5000)
应用

看起来差不多了,现在我们提供一个方法用于判断当前服务是否是主服务

1
2
3
public boolean isMaster() {
return electionThread.isMaster();
}

Ok,现在可以拿来用了,首先看下定时任务的情况,定时任务一般只想在主服务上运行,这时就可以这样写了

1
2
3
4
5
6
7
scheduler.scheduleAtFixedRate(() -> {
if (!isMaster()) {
//do nothing
return;
}
//do something
}, 0, 60, TimeUnit.MINUTES);
初始化问题

这个判断方法能不能在锁状态初始化完成之前阻塞,这样在类似上面的定时任务里(如1小时)做判断时,主服务不至于因为没初始化,然后就得等到下个小时才能执行,我们修改两个地方。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public boolean isMaster() {
//这里加上一个初始化锁,当没有初始化时,阻塞当前线程
synchronized (initLock) {
if (!isInit) {
try {
initLock.wait();
} catch (InterruptedException e) {

}
}
}
return electionThread.isMaster();
}

class ElectionThread extends Thread {

private boolean isMaster = false;

public ElectionThread() {
setName("leader-election");
}

@Override
public void run() {
while (!stop) {
try {
if (isMaster) {
synchronized (masterLock) {
if (isInit) {
//获得锁且已经初始化过就进入阻塞状态
masterLock.wait();
} else {
//获得锁,还未设置初始化状态,就等待一会儿,给线程机会设置初始化状态
masterLock.wait(Duration.ofSeconds(WAIT_SECONDS).toMillis());
}
}
} else {
isMaster = leaderLock.tryLock(WAIT_SECONDS, TimeUnit.SECONDS);
if (isMaster) {
logger.info("got leadership");
}
}
} catch (InterruptedException e) {

} finally {
//设置线程初始化状态
synchronized (initLock) {
if (!isInit) {
//初始化完成唤醒调用isMaster方法处于阻塞状态的线程,并设置初始化状态
initLock.notifyAll();
isInit = true;
}
}
}
}

if (leaderLock.isLocked() && leaderLock.isHeldByCurrentThread()) {
leaderLock.unlock();
}

if (isMaster) {
isMaster = false;
}
}

public boolean isMaster() {
return isMaster;
}
}
从服务变成主服务

还有一种情况,我们需要考虑,当主服务挂了,从服务变成主服务时,在上面的1小时定时任务已经过了执行时间,现在想在从服务变成主服务时,马上就执行任务,要怎么办?我们可以加上一个从服务变成主服务的监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private List<ElectionListener> listeners = new ArrayList<>();
public interface ElectionListener {
void onElected();
}
public void addElectionListener(ElectionListener electionListener) {
if (listeners.contains(electionListener)) {
return;
}
listeners.add(electionListener);
}
private void notifyElected() {
for (ElectionListener listener : listeners) {
listener.onElected();
}
}

//ElectionThread代码片断
isMaster = leaderLock.tryLock(WAIT_SECONDS, TimeUnit.SECONDS);
if (isMaster) {
logger.info("got leadership");
notifyElected();
}

总结

大功告成,最后看一下完整版的代码

LeaderElection
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class LeaderElection {

private static final Logger logger = LoggerFactory.getLogger(LeaderElection.class);

private static final int WAIT_SECONDS = 1;

private RedissonClient redissonClient;

private RLock leaderLock;

private boolean stop = false;

private boolean isInit = false;

private Object masterLock = new Object();

private Object initLock = new Object();

private ElectionThread electionThread = new ElectionThread();

private List<ElectionListener> listeners = new ArrayList<>();

public void tryHold(String leaderName) {
leaderLock = redissonClient.getLock(leaderName);
electionThread.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> shutdown()));
}

public boolean isMaster() {
synchronized (initLock) {
if (!isInit) {
try {
initLock.wait();
} catch (InterruptedException e) {

}
}
}
return electionThread.isMaster();
}

public void addElectionListener(ElectionListener electionListener) {
if (listeners.contains(electionListener)) {
return;
}
listeners.add(electionListener);
}

public void shutdown() {
stop = true;
try {
synchronized (masterLock) {
masterLock.notifyAll();
}
electionThread.join();
listeners.clear();
} catch (InterruptedException e) {

}
logger.info("shutdown and give up leadership");
}

class ElectionThread extends Thread {

private boolean isMaster = false;

public ElectionThread() {
setName("leader-election");
}

@Override
public void run() {
while (!stop) {
try {
if (isMaster) {
synchronized (masterLock) {
if (isInit) {
masterLock.wait();
} else {
masterLock.wait(Duration.ofSeconds(WAIT_SECONDS).toMillis());
}
}
} else {
isMaster = leaderLock.tryLock(WAIT_SECONDS, TimeUnit.SECONDS);
if (isMaster) {
logger.info("got leadership");
notifyElected();
}
}
} catch (InterruptedException e) {

} finally {
synchronized (initLock) {
if (!isInit) {
initLock.notifyAll();
isInit = true;
}
}
}
}

if (leaderLock.isLocked() && leaderLock.isHeldByCurrentThread()) {
leaderLock.unlock();
}

if (isMaster) {
isMaster = false;
}
}

public boolean isMaster() {
return isMaster;
}
}

private void notifyElected() {
for (ElectionListener listener : listeners) {
listener.onElected();
}
}

public void setRedissonClient(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}

}
ElectionListener
1
2
3
public interface ElectionListener {
void onElected();
}
在springboot中的配置应用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Configuration
public class LeaderElectionConfig {

@Value("${spring.application.name}")
private String appName;

@Bean(destroyMethod = "shutdown")
public LeaderElection leaderElection(RedissonClient redissonClient) {
LeaderElection leaderElection = new LeaderElection();
leaderElection.setRedissonClient(redissonClient);
leaderElection.tryHold("leader-lock-" + appName);
return leaderElection;
}

}

源码

1
2
https://github.com/huangyemin/wheel
https://gitee.com/huangyemin/wheel
零壹视界 wechat
扫描关注我的微信公众号
喜欢就赞赏一下
0%